#! /usr/bin/env python3
# pyright: strict

# --------------------------------------------------------------------
# --- Cachegrind's merger.                             cg_merge.in ---
# --------------------------------------------------------------------

# This file is part of Cachegrind, a high-precision tracing profiler
# built with Valgrind.
#
# Copyright (C) 2002-2023 Nicholas Nethercote
#    njn@valgrind.org
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
# The GNU General Public License is contained in the file COPYING.

# This script merges Cachegrind output files.
#
# Use `make pymerge` to "build" this script every time it is changed. This runs
# the formatters, type-checkers, and linters on `cg_merge.in` and then
# generates `cg_merge`.
#
# This is a cut-down version of `cg_annotate.in`.

from __future__ import annotations

import re
import sys
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from typing import DefaultDict, NoReturn, TextIO


# A typed wrapper for parsed args.
class Args(Namespace):
    # None of these fields are modified after arg parsing finishes.
    output: str
    cgout_filename: list[str]

    @staticmethod
    def parse() -> Args:
        desc = (
            "Merge multiple Cachegrind output files. Deprecated; use "
            "`cg_annotate` with multiple Cachegrind output files instead."
        )
        p = ArgumentParser(description=desc)

        p.add_argument("--version", action="version", version="%(prog)s-@VERSION@")

        p.add_argument(
            "-o",
            dest="output",
            type=str,
            metavar="FILE",
            help="output file (default: stdout)",
        )

        p.add_argument(
            "cgout_filename",
            nargs="+",
            metavar="cachegrind-out-file",
            help="file produced by Cachegrind",
        )

        return p.parse_args(namespace=Args())  # type: ignore [return-value]


# Args are stored in a global for easy access.
args = Args.parse()


# A single instance of this class is constructed, from `args` and the `events:`
# line in the cgout file.
class Events:
    # The event names.
    events: list[str]

    def __init__(self, text: str) -> None:
        self.events = text.split()
        self.num_events = len(self.events)

    # Raises a `ValueError` exception on syntax error.
    def mk_cc(self, str_counts: list[str]) -> Cc:
        # This is slightly faster than a list comprehension.
        counts = list(map(int, str_counts))

        if len(counts) == self.num_events:
            pass
        elif len(counts) < self.num_events:
            # Add zeroes at the end for any missing numbers.
            counts.extend([0] * (self.num_events - len(counts)))
        else:
            raise ValueError

        return counts

    def mk_empty_cc(self) -> Cc:
        # This is much faster than a list comprehension.
        return [0] * self.num_events


# A "cost centre", which is a dumb container for counts. Always the same length
# as `Events.events`, but it doesn't even know event names. `Events.mk_cc` and
# `Events.mk_empty_cc` are used for construction.
#
# This used to be a class with a single field `counts: list[int]`, but this
# type is very hot and just using a type alias is much faster.
Cc = list[int]


# Add the counts in `a_cc` to `b_cc`.
def add_cc_to_cc(a_cc: Cc, b_cc: Cc) -> None:
    for i, a_count in enumerate(a_cc):
        b_cc[i] += a_count


# Per-line CCs, organised by filename, function name, and line number.
DictLineCc = DefaultDict[int, Cc]
DictFnDictLineCc = DefaultDict[str, DictLineCc]
DictFlDictFnDictLineCc = DefaultDict[str, DictFnDictLineCc]


def die(msg: str) -> NoReturn:
    print("cg_merge: error:", msg, file=sys.stderr)
    sys.exit(1)


def read_cgout_file(
    cgout_filename: str,
    is_first_file: bool,
    cumul_dict_fl_dict_fn_dict_line_cc: DictFlDictFnDictLineCc,
    cumul_summary_cc: Cc,
) -> tuple[list[str], str, Events]:
    # The file format is described in Cachegrind's manual.
    try:
        cgout_file = open(cgout_filename, "r", encoding="utf-8")
    except OSError as err:
        die(f"{err}")

    with cgout_file:
        cgout_line_num = 0

        def parse_die(msg: str) -> NoReturn:
            die(f"{cgout_file.name}:{cgout_line_num}: {msg}")

        def readline() -> str:
            nonlocal cgout_line_num
            cgout_line_num += 1
            return cgout_file.readline()

        # Read "desc:" lines.
        desc: list[str] = []
        while line := readline():
            if m := re.match(r"desc:\s+(.*)", line):
                desc.append(m.group(1))
            else:
                break

        # Read "cmd:" line. (`line` is already set from the "desc:" loop.)
        if m := re.match(r"cmd:\s+(.*)", line):
            cmd = m.group(1)
        else:
            parse_die("missing a `command:` line")

        # Read "events:" line.
        line = readline()
        if m := re.match(r"events:\s+(.*)", line):
            events = Events(m.group(1))
        else:
            parse_die("missing an `events:` line")

        def mk_empty_dict_line_cc() -> DictLineCc:
            return defaultdict(events.mk_empty_cc)

        def mk_empty_dict_fn_dict_line_cc() -> DictFnDictLineCc:
            return defaultdict(mk_empty_dict_line_cc)

        summary_cc_present = False

        fl = ""
        fn = ""

        # The `cumul_*` values are passed in by reference and are modified by
        # this function. But they can't be properly initialized until the
        # `events:` line of the first file is read and the number of events is
        # known. So we initialize them in an invalid state, and then
        # reinitialize them properly here, before their first use.
        if is_first_file:
            cumul_dict_fl_dict_fn_dict_line_cc.default_factory = (
                mk_empty_dict_fn_dict_line_cc
            )
            cumul_summary_cc.extend(events.mk_empty_cc())

        # Line matching is done in order of pattern frequency, for speed.
        while line := readline():
            if line[0].isdigit():
                split_line = line.split()
                try:
                    line_num = int(split_line[0])
                    cc = events.mk_cc(split_line[1:])
                except ValueError:
                    parse_die("malformed or too many event counts")

                # Record this CC at the file/func/line level.
                add_cc_to_cc(cc, cumul_dict_fl_dict_fn_dict_line_cc[fl][fn][line_num])

            elif line.startswith("fn="):
                fn = line[3:-1]

            elif line.startswith("fl="):
                fl = line[3:-1]
                # A `fn=` line should follow, overwriting the "???".
                fn = "???"

            elif m := re.match(r"summary:\s+(.*)", line):
                summary_cc_present = True
                try:
                    add_cc_to_cc(events.mk_cc(m.group(1).split()), cumul_summary_cc)
                except ValueError:
                    parse_die("malformed or too many event counts")

            elif line == "\n" or line.startswith("#"):
                # Skip empty lines and comment lines.
                pass

            else:
                parse_die(f"malformed line: {line[:-1]}")

    # Check if summary line was present.
    if not summary_cc_present:
        parse_die("missing `summary:` line, aborting")

    # In `cg_annotate.in` and `cg_diff.in` we check that the file's summary CC
    # matches the totals of the file's individual CCs, but not here. That's
    # because in this script we don't collect the file's CCs in isolation,
    # instead we just add them to the accumulated CCs, for speed. This makes it
    # difficult to do the per-file checking.

    return (desc, cmd, events)


def main() -> None:
    desc1: list[str] | None = None
    cmd1 = None
    events1 = None

    # Different places where we accumulate CC data. Initialized to invalid
    # states prior to the number of events being known.
    cumul_dict_fl_dict_fn_dict_line_cc: DictFlDictFnDictLineCc = defaultdict(None)
    cumul_summary_cc: Cc = []

    for n, filename in enumerate(args.cgout_filename):
        is_first_file = n == 0
        (desc_n, cmd_n, events_n) = read_cgout_file(
            filename,
            is_first_file,
            cumul_dict_fl_dict_fn_dict_line_cc,
            cumul_summary_cc,
        )
        # We reuse the description and command from the first file, like the
        # the old C version of `cg_merge`.
        if is_first_file:
            desc1 = desc_n
            cmd1 = cmd_n
            events1 = events_n
        else:
            assert events1
            if events1.events != events_n.events:
                die("events in data files don't match")

    def write_output(f: TextIO) -> None:
        # These assertions hold because the loop above executes at least twice.
        assert desc1
        assert events1
        assert cumul_dict_fl_dict_fn_dict_line_cc is not None
        assert cumul_summary_cc

        for desc_line in desc1:
            print("desc:", desc_line, file=f)
        print("cmd:", cmd1, file=f)
        print("events:", *events1.events, sep=" ", file=f)

        for fl, dict_fn_dict_line_cc in cumul_dict_fl_dict_fn_dict_line_cc.items():
            print(f"fl={fl}", file=f)
            for fn, dict_line_cc in dict_fn_dict_line_cc.items():
                print(f"fn={fn}", file=f)
                for line, cc in dict_line_cc.items():
                    print(line, *cc, file=f)

        print("summary:", *cumul_summary_cc, sep=" ", file=f)

    if args.output:
        try:
            with open(args.output, "w", encoding="utf-8") as f:
                write_output(f)
        except OSError as err:
            die(f"{err}")
    else:
        write_output(sys.stdout)


if __name__ == "__main__":
    main()
